insert overwrite table bi_ads.ads_log_device_export_csv_out
partition(dt='20230809',biz_type)
select
/*+ repartition(1) */
device_id
,event_from
,event_id
,status
, if(length(cast(event_time as string)) = 10,cast(concat(cast(event_time as string),'000') as bigint),event_time)
,code
,value
-- ,event_detail
,biz_type
from (
select
a.dev_id as device_id
,'' as code
,'' as value
,1 as event_from
,if(a.event_type = 'ONLINE' , 1, 2) as event_id
,1 as status
,coalesce(time,0) as event_time
,if(a.event_type = 'OFFLINE' , reason, '') as event_detail
,b.biz_type
from bi_ods_clear.ods_clear_smart_device_event a
join bi_dw.dim_device_basic_info_item_df b
on a.dev_id = b.dev_id
and a.dt = '20230809'
and b.dt = '20230809'
and b.biz_type in ('299212','306804','600358')
and a.event_type IN ('ONLINE','OFFLINE')
union all
select
a.id as device_id
,'' as code
,'' as value
,1 as event_from
,3 as event_id
,1 as status
,a.active_time as event_time
,'' as event_detail
,b.biz_type
from bi_ods_clear.ods_clear_smartapollobizdata_gateway_active a
join bi_dw.dim_device_basic_info_item_df b on a.id = b.dev_id
where a.dt = '20230809'
and b.dt = '20230809'
and b.biz_type in ('299212','306804','600358')
union all
select
a.gw_id as device_id
,'' as code
,'' as value
,coalesce(a.reason, 1) as event_from
,4 as event_id
,1 as status
,split(a.row_key, '-')[1] as event_time
,'' as event_detail
,b.biz_type
from bi_ods_clear.ods_clear_smartapollobizdata_gateway_reset a
join bi_dw.dim_device_basic_info_item_df b on a.gw_id = b.dev_id
where a.dt = '20230809'
and b.dt = '20230809'
and b.biz_type in ('299212','306804','600358')
union all
select
a.subid as device_id
,c.code
,a.dp_value as value
,case when a.reason = 'voice_control' then 3
when a.reason = 'rule' then 4
when a.reason = 'app' then 2
else 1 end as event_from
,5 as event_id
,1 as status
,a.dp_time as event_time
,'' as event_detail
,b.biz_type
from bi_ods_clear.ods_clear_datapoint_publish a
join (
select dev_id, biz_type, product_id
from bi_dw.dim_device_basic_info_item_df
where dt = '20230809'
and biz_type in ('299212','306804','600358')
) b
on a.subid = b.dev_id
and a.dt = '20230809'
left join (
select b.id as product_id, a.dp_id, a.code, a.name, a.mode
from bi_ods.ods_smart_schema_datapoint a
left join bi_ods.ods_smart_product b
on a.schema_id = b.schema_id
and b.dt = '20230809'
where a.dt = '20230809'
and a.status = 1
-- and a.selected = 1
group by b.id, a.dp_id, a.code, a.name, a.mode
) c
on c.product_id = b.product_id
and c.dp_id = a.dp_id
union all
select
a.dev_id as device_id
,'' as code
,'' as value
,case when ((a.log_type = 'device_upgrade_begin' or a.log_type = 'upgrade_device_info') and a.upgrade_type != null) then (a.upgrade_type + 7)
when ((a.log_type = 'device_upgrade_begin' or a.log_type = 'upgrade_device_info') and a.upgrade_type = null) then 2
when (a.log_type = 'device_upgrade_status' or a.log_type = 'device_upgrade_url' or a.log_type = 'device_upgrade_progress' or a.log_type = 'device_upgrade_trigger' or a.log_type = 'device_upgrade_version' or a.log_type = 'device_upgrade_inactive_connect') then 1
else 2 end as event_from
,6 as event_id
,1 as status
,cast(to_timestamp(timestamp) as bigint) as event_time
,'' as event_detail
,b.biz_type
from bi_ods_clear.ods_clear_apollobizdata_device_upgrade_all a
join bi_dw.dim_device_basic_info_item_df b on a.dev_id = b.dev_id
where a.dt = '20230809'
and b.dt = '20230809'
and b.biz_type in ('299212','306804','600358')
union all
select
a.dev_id as device_id
,c.code
,a.dp_value as value
-- ,case when c.mode = 'ro' then '1'
-- when c.mode = 'rw' then '2'
-- else '-1' end as event_from
,1 as event_from
,7 as event_id
,1 as status
,coalesce(dp_time,ts) as event_time
,'' as event_detail
,b.biz_type
from bi_ods_clear.ods_clear_datapoint_report a
join (
select dev_id,biz_type,product_id
from bi_dw.dim_device_basic_info_item_df
where dt = '20230809'
and biz_type in ('299212','306804','600358')
) b
on a.dev_id = b.dev_id
and a.dt = '20230809'
left join (
SELECT b.id as product_id, a.dp_id, a.code, a.name, a.mode
FROM bi_ods.ods_smart_schema_datapoint a
left join bi_ods.ods_smart_product b
on a.schema_id = b.schema_id
and b.dt = '20230809'
where a.dt = '20230809'
and a.status = 1
-- and a.selected = 1
group by b.id, a.dp_id, a.code, a.name, a.mode
) c
on c.product_id = a.pid
and c.dp_id = a.dp_id
union all
select
a.dev_id as device_id
,'' as code
,'' as value
,1 as event_from
,8 as event_id
,1 as status
,a.event_time as event_time
,a.event_detail as event_detail
,b.biz_type
from (
select
get_json_object(jsondata,'$.gwId') as dev_id
,get_json_object(regexp_extract(get_json_object(jsondata,'$.reportData'), '^\\{(.+)\\}$', 1), '$.t') as event_time
,get_json_object(jsondata,'$.reportData') as event_detail
from bi_ods_log.ods_log_data_analysis_hades_analyzer_data_pipline
where dt = '20230809'
and get_json_object(jsondata,'$.logType') = 'runstat'
) a
join bi_dw.dim_device_basic_info_item_df b on a.dev_id = b.dev_id
and b.dt = '20230809'
and b.biz_type in ('299212','306804','600358')
union all
select
get_json_object(a.jsondata, '$.fields.devId') as device_id
,'' as code
,'' as value
,-1 as event_from
,10 as event_id
,1 as status
,get_json_object(a.jsondata, '$.fields.gmtCreate') as event_time
,get_json_object(a.jsondata, '$.msgJson') as event_detail
,b.biz_type
from bi_ods_log.ods_log_data_analysis_sigma_facade_biz_data_pipline a
join (
select dev_id,biz_type,product_id
from bi_dw.dim_device_basic_info_item_df
where dt = '20230809'
and biz_type in ('299212','306804','600358')
) b
on get_json_object(a.jsondata, '$.fields.devId') = b.dev_id
and a.dt = '20230809'
union all
select
a.gw_id as device_id
,'' as code
,'' as value
,1 as event_from
,9 as event_id
,1 as status
,a.ts as event_time
,a.report_data as event_detail
,b.biz_type
from bi_ods_clear.ods_clear_hadesanalyzerdata_device_restart a
join (
select dev_id,biz_type,product_id
from bi_dw.dim_device_basic_info_item_df
where dt = '20230809'
and biz_type in ('299212','306804','600358')
) b
on a.gw_id = b.dev_id
and a.dt = '20230809'
) t
spark-sql --master yarn --queue default --driver-memory 2g --driver-cores 2 --executor-memory 6g --executor-cores 2 --num-executors 4 --conf spark.driver.cores=2 --conf spark.eventLog.enabled=false --conf spark.sql.broadcastTimeout=600000 --conf spark.dynamicAllocation.enabled=false --conf spark.sql.adaptive.enabled=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.blacklist.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.autoBroadcastJoinThreshold=1G
分析原因:
因为spark.sql.adaptive.enabled=true开启自动调优,spark.sql.autoBroadcastJoinThreshold=2G
会开启autoBroadcast,spark.sql.autoBroadcastJoinThreshold=-1关闭就可以了。
如果资源充足那就需要增加driver内存和调整spark.sql.autoBroadcastJoinThreshold内存,可以根据需要开启spark.broadcast.compress=true。
在使用多表关联的时候慎重开启spark.sql.adaptive.enabled=true。
文章评论